Crate rumqttc[−][src]
Expand description
A pure rust MQTT client which strives to be robust, efficient and easy to use. This library is backed by an async (tokio) eventloop which handles all the robustness and and efficiency parts of MQTT but naturally fits into both sync and async worlds as we’ll see
Let’s jump into examples right away
A simple synchronous publish and subscribe
use rumqttc::{MqttOptions, Client, QoS};
use std::time::Duration;
use std::thread;
let mut mqttoptions = MqttOptions::new("rumqtt-sync", "test.mosquitto.org", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));
let (mut client, mut connection) = Client::new(mqttoptions, 10);
client.subscribe("hello/rumqtt", QoS::AtMostOnce).unwrap();
thread::spawn(move || for i in 0..10 {
client.publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![i; i as usize]).unwrap();
thread::sleep(Duration::from_millis(100));
});
// Iterate to poll the eventloop for connection progress
for (i, notification) in connection.iter().enumerate() {
println!("Notification = {:?}", notification);
}
A simple asynchronous publish and subscribe
use rumqttc::{MqttOptions, AsyncClient, QoS};
use tokio::{task, time};
use std::time::Duration;
use std::error::Error;
let mut mqttoptions = MqttOptions::new("rumqtt-async", "test.mosquitto.org", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));
let (mut client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
client.subscribe("hello/rumqtt", QoS::AtMostOnce).await.unwrap();
task::spawn(async move {
for i in 0..10 {
client.publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![i; i as usize]).await.unwrap();
time::sleep(Duration::from_millis(100)).await;
}
});
loop {
let notification = eventloop.poll().await.unwrap();
println!("Received = {:?}", notification);
}
Quick overview of features
- Eventloop orchestrates outgoing/incoming packets concurrently and hadles the state
- Pings the broker when necessary and detects client side half open connections as well
- Throttling of outgoing packets (todo)
- Queue size based flow control on outgoing packets
- Automatic reconnections by just continuing the
eventloop.poll()/connection.iter()
loop` - Natural backpressure to client APIs during bad network
- Immediate cancellation with
client.cancel()
In short, everything necessary to maintain a robust connection
Since the eventloop is externally polled (with iter()/poll()
in a loop)
out side the library and Eventloop
is accessible, users can
- Distribute incoming messages based on topics
- Stop it when required
- Access internal state for use cases like graceful shutdown or to modify options before reconnection
Important notes
-
Looping on
connection.iter()
/eventloop.poll()
is necessary to run the event loop and make progress. It yields incoming and outgoing activity notifications which allows customization as you see fit. -
Blocking inside the
connection.iter()
/eventloop.poll()
loop will block connection progress.
FAQ
Connecting to a broker using raw ip doesn’t work
You cannot create a TLS connection to a bare IP address with a self-signed
certificate. This is a limitation of rustls.
One workaround, which only works under *nix/BSD-like systems, is to add an
entry to wherever your DNS resolver looks (e.g. /etc/hosts
) for the bare IP
address and use that name in your code.
Modules
Structs
AsyncClient
to communicate with MQTT Eventloop
This is cloneable and can be used to asynchronously Publish, Subscribe.
Client
to communicate with MQTT eventloop Connection
.
Common configuration for (typically) all connections made by a program.
Acknowledgement to connect packet
Connection packet initiated by the client
MQTT connection. Maintains all the necessary state
Eventloop with all the state of a connection
Packet type from a byte
LastWill that broker forwards on behalf of the client
Options to configure the behaviour of mqtt connection
State of the mqtt connection.
Acknowledgement to QoS1 publish
Acknowledgement to QoS1 publish
Acknowledgement to QoS1 publish
Acknowledgement to QoS1 publish
Publish packet
An error returned from Sender::send()
.
The sending side of a channel.
Acknowledgement to subscribe
Subscription packet
Subscription filter
Acknowledgement to unsubscribe
Unsubscribe packet
Enums
Client Error
Return code in connack
Critical errors during eventloop polling
Error during serialization and deserialization
Events which can be yielded by the event loop
Key type for TLS authentication
url
Current outgoing activity on the eventloop
Encapsulates all MQTT packet types
MQTT packet type
Protocol type
Quality of service
Requests by the client to mqtt event loop. Request are handled one by one.
Errors during state handling
An error returned from Sender::try_send()
.
Functions
Extract all the certificates from rd, and return a vec of key::Certificate
s
containing the der-format contents.
Checks if the stream has enough bytes to frame a packet and returns fixed header
only if a packet can be framed with existing bytes in the stream
.
The passed stream doesn’t modify parent stream’s cursor. If this function
returned an error, next check
on the same parent stream is forced start
with cursor at 0 again (Iter is owned. Only Iter’s cursor is changed internally)
Checks if a topic or topic filter has wildcards
Checks if topic matches a filter. topic and filter validation isn’t done here.
Extract all PKCS8-encoded private keys from rd, and return a vec of
key::PrivateKey
s containing the der-format contents.
Maps a number to QoS
Reads a stream of bytes and extracts next MQTT packet out of it
Extract all RSA private keys from rd, and return a vec of key::PrivateKey
s
containing the der-format contents.
Checks if the filter is valid
Checks if a topic is valid